{{define "content"}}
<div class="row">
  <div class="container">
    <div class="col-md-12">
      <h1>{{.title}}</h1>

      <div class="panel panel-default">
        <div class="panel panel-heading">
          <h3>Partition Table (State)</h3>
        </div>

        <div class="panel-body">
          <table class="table table-striped">
            <thead>
              <tr>
                <th title="Partition" rowspan="2">Partition</th>
                <th title="Status of the processor" rowspan="2">State</th>
                <th title="Stats during recovery" colspan="3">Recovery</th>
                <th title="Stats during run time" colspan="2">Writes</th>
              </tr>
              <tr>
                <th title="Time of passed recovery so far">Time</th>
                <th title="Number of messages recovered per second">Rate</th>
                <th title="Number of messages lagging behind HWM">Lag</th>
                <th title="Number of messages updated per second">Rate</th>
                <th title="Number of bytes updated per second">Bytes</th>
              </tr>
            </thead>
            <tbody id="tableStatistics">
            </tbody>
          </table>
        </div>
      </div>

      <div class="panel panel-default">
        <div class="panel panel-heading">
          <h3>Joins/Lookups</h3>
        </div>
        <div class="panel-body">
          <table class="table table-striped">
            <thead>
              <tr>
                <th title="Type of " rowspan="2">Type</th>
                <th title="Topic" rowspan="2">Topic</th>
                <th title="Partition" rowspan="2">Partition</th>
                <th title="Status of the Table" rowspan="2">State</th>
                <th title="Stats during recovery" colspan="3">Recovery</th>
                <th title="Stats during run time" colspan="4">Incoming</th>
              </tr>
              <tr>
                <th title="Time of passed recovery so far">Time</th>
                <th title="Number of messages recovered per second">Rate</th>
                <th title="Number of messages lagging behind HWM">Lag</th>
                <th title="Number of messages updated per second">Rate</th>
                <th title="Number of bytes updated per second">Bytes</th>
                <th title="number of offsets, the view is lagging">Offset Lag</th>
                <th title="time delay, the view is lagging">Time Lag</th>
              </tr>
            </thead>
            <tbody id="joinStats">
            </tbody>
          </table>
        </div>
      </div>

      <div class="panel panel-default">
        <div class="panel panel-heading">
          <h3>Input Traffic</h3>
          <ul id="inputTopics">
          </ul>

        </div>
        <div class="panel-body">
          <table class="table table-striped">
            <thead>
              <tr>
                <th title="Topic" rowspan="2">Topic</th>
                <th title="Last Offset" rowspan="2">Offset (min/avg/max)</th>
                <th title="Rate of events" colspan="2">Event Rate</th>
                <th title="Rate of bytes" colspan="2">Byte Rate</th>
                <th title="Lag for offset and time" colspan="3">Lag (offset/time)</th>
              </tr>
              <tr>
                <th title="Event rate in total">Total</th>
                <th title="Event rate per partition">Per Partition</th>
                <th title="Byte rate in total">Total</th>
                <th title="Byte rate per partition">Per Partition</th>
                <th title="Min Lag">Min</th>
                <th title="Avg Lag">Avg</th>
                <th title="Max Lag">Max</th>
              </tr>
            </thead>
            <tbody id="inputStatistics">
            </tbody>
          </table>
        </div>
      </div>

      <div class="panel panel-default">
        <div class="panel panel-heading">
          <h3>Outgoing Traffic</h3>
          <ul id="inputTopics">
          </ul>

        </div>
        <div class="panel-body">
          <table class="table table-striped">
            <thead>
              <tr>
                <th title="Topic" rowspan="2">Topic</th>
                <th title="Rate of events" colspan="2">Event Rate</th>
                <th title="Rate of bytes" colspan="2">Byte Rate</th>
              </tr>
              <tr>
                <th title="Event rate in total">Total</th>
                <th title="Event rate per partition">Per Partition</th>
                <th title="Byte rate in total">Total</th>
                <th title="Byte rate per partition">Per Partition</th>
              </tr>
            </thead>
            <tbody id="outputStatistics">
            </tbody>
          </table>
        </div>
      </div>

      <script type="text/javascript">
        var lastInputStats = d3.local();
        var lastOutputStats = d3.local();
        var lastTableStats = d3.local();
        var lastJoinStats = d3.local();
        var statusMap = {
          0: "stopped",
          1: "initializing",
          2: "connecting",
          3: "recovering",
          4: "preparing",
          5: "running"
        };
        var runModeMap = {
          0: "active",
          1: "standby",
          2: "recover-only"
        };

        var secsToNano = 1000000000;

        var renderDetails = function(partitions) {

          if (partitions == null) {
            return;
          }

          var updateTableStatistics = function(stats) {

            var status = statusMap[stats.Status];
            var runMode = runModeMap[stats.RunMode];

            if (stats.Stalled) {
              status = "stalled";
            }
            var recEndTime = stats.Recovery.RecoveryTime != "" ? stats.Recovery.RecoveryTime : stats.Now;
            var recoveryTime = (new Date(recEndTime) - new Date(stats.Recovery.StartTime)) / 1000.0;
            var recoveryRate = 0;
            var recoveryLag = stats.Recovery.Hwm - stats.Recovery.Offset;

            var writeRate = 0;
            var writeBytes = 0;

            // let's check if we have a previous dataset and set them to their
            // correct values
            var lastStats = lastTableStats.get(this);

            if (lastStats) {
              // diff to previous stats in seconds
              var timeDiff = (stats.Now - lastStats.Now) / 1000.0;

              if (status == "recovering") {
                recoveryRate = (stats.Recovery.Offset - lastStats.Recovery.Offset) / timeDiff;
              }

              writeRate = (stats.Writes.Count - lastStats.Writes.Count) / timeDiff;
              writeBytes = (stats.Writes.Bytes - lastStats.Writes.Bytes) / timeDiff;
            }
            lastTableStats.set(this, stats);

            return '<td>' + stats.partition.toFixed(0) + '</td>\n' +
              '<td>' + status + " (" + runMode + ')</td>\n' +
              '<td>' + recoveryTime + 's</td>\n' +
              '<td>' + recoveryRate.toFixed(0) + '</td>\n' +
              '<td>' + recoveryLag + '</td>\n' +
              '<td>' + writeRate.toFixed(0) + '</td>\n' +
              '<td>' + writeBytes.toFixed(0) + '</td>\n';
          };

          var groupChain = _.chain(partitions.Group);
          var partitionList = groupChain
            .filter(function(v) {
              return v.TableStats != null;
            })
            .map(function(v, partId) {
              return _.assign({},
                v.TableStats, {
                  'Now': new Date(v.Now),
                  'partition': parseInt(partId),
                },
              );
            }).values().sortBy('partition').value();

          // sort all partitions
          // update, enter and remove data
          var d = d3.select("#tableStatistics").selectAll(".partitionbox").data(partitionList);
          d.html(updateTableStatistics);
          d.enter().append("tr").classed("partitionbox", true).html(updateTableStatistics);
          d.exit().remove()

          var renderInput = function(data) {
            var topic = data[0];
            var stats = data[1];

            // default values we're going to display.
            // this object will be stored in lastInputStats to calculate rates as well
            var values = {
              "now": new Date(),
              "countSum": _.sum(stats.Count),
              "countAvg": _.mean(stats.Count),
              "bytesSum": _.sum(stats.Bytes),
              "bytesAvg": _.mean(stats.Bytes),
            };


            var countRate = 0;
            var countRateAvg = 0;
            var bytesRate = 0;
            var bytesRateAvg = 0;

            var lastStats = lastInputStats.get(this);

            if (lastStats) {
              // diff to previous stats in seconds
              var timeDiff = (values.now - lastStats.now) / 1000.0;

              if (timeDiff) {
                countRate = (values.countSum - lastStats.countSum) / timeDiff;
                countRateAvg = (values.countAvg - lastStats.countAvg) / timeDiff;
                bytesRate = (values.bytesSum - lastStats.bytesSum) / timeDiff;
                bytesRateAvg = (values.bytesAvg - lastStats.bytesAvg) / timeDiff;
              }
            }
            // add new values for the next round
            lastInputStats.set(this, values);

            var delay = {
              min: _.min(stats.Delay).toFixed(0) / 1000000000,
              mean: _.mean(stats.Delay).toFixed(0) / 1000000000,
              max: _.max(stats.Delay).toFixed(0) / 1000000000,
            }

            return '<td>' + topic + '</td>\n' +
              '<td>' + _.min(stats.LastOffset).toFixed(0) + "/" + _.mean(stats.LastOffset).toFixed(0) + "/" + _.max(stats.LastOffset).toFixed(0) + '</td>\n' +
              '<td>' + countRate.toFixed(0) + '</td>\n' +
              '<td>' + countRateAvg.toFixed(0) + '</td>\n' +
              '<td>' + bytesRate.toFixed(0) + '</td>\n' +
              '<td>' + bytesRateAvg.toFixed(0) + '</td>\n' +
              '<td>' + _.min(stats.OffsetLag).toFixed(0) + '/' + delay.min.toFixed(2) + 's</td>\n' +
              '<td>' + _.mean(stats.OffsetLag).toFixed(0) + '/' + delay.mean.toFixed(2) + 's</td>\n' +
              '<td>' + _.max(stats.OffsetLag).toFixed(0) + '/' + delay.max.toFixed(2) + 's</td>\n';

          };

          // working on object "Input here":
          /*
          {
            "Group": {
              "0": {
                 .....
                "Input": {
                  "user-click": {"Count": 12, ...}
                  }
                },
              },
            }
          */
          var aggregateStatsByTopic = function(partitionGroup, source, statsValues) {

            // (1) get all the stats in a list where the topic name is in the key 'topic'
            var topicGroup = _.chain(partitionGroup)
              .flatMap(
                function(value) {
                  return _.map(value[source],
                    function(stats, topic) {
                      stats["_topic"] = topic;
                      return stats;
                    });
                }
              ).values().
              // then group it by the name
            groupBy(function(v) {
              return v["_topic"];
            }).value();

            // now we have
            /*
            {
              "user-click": [
                {"Count": 12, ...},
                {"Count": 13, ...},
                ...
              ],
            }
            */

            // (2) let's aggregate the values together for each topics
            // we'll transorm them so each topic has an object with a list of stats
            var topicStats = _.mapValues(topicGroup, function(topicStats) {
              return _.transform(topicStats,
                // aggregator is called for every object ( {"Count": 12, ...} )
                function(acc, value) {
                  // go over the accu values and append each value from the new object.
                  _.forEach(
                    _.keys(acc),
                    function(key) {
                      acc[key].push(value[key]);
                    }
                  );
                },
                // empty accumulator: empty lists for all values we want
                _.zipObject(statsValues, _.map(statsValues, function() {
                  return [];
                }))
              )
            });

            return topicStats;
          }

          var topicStatsList = _.toPairs(aggregateStatsByTopic(partitions.Group, "Input", ["Count", "Bytes", "OffsetLag", "LastOffset", "Delay"]));

          // now we have an object of all topics, with a list of all values (one for each partition)
          // render the input statistics in the input box
          var d = d3.select("#inputStatistics").selectAll(".partitionbox").data(topicStatsList, function(d) {
            return d[0];
          });
          d.enter().append("tr").classed("partitionbox", true).html(renderInput);
          d.html(renderInput);
          d.exit().remove()


          var renderOutput = function(data) {
            var topic = data[0];
            var stats = data[1];

            // default values we're going to display.
            // this object will be stored in lastInputStats to calculate rates as well
            var values = {
              "now": new Date(),
              "countSum": _.sum(stats.Count),
              "countAvg": _.mean(stats.Count),
              "bytesSum": _.sum(stats.Bytes),
              "bytesAvg": _.mean(stats.Bytes),
            };


            var countRate = 0;
            var countRateAvg = 0;
            var bytesRate = 0;
            var bytesRateAvg = 0;

            var lastStats = lastOutputStats.get(this);

            if (lastStats) {
              // diff to previous stats in seconds
              var timeDiff = (values.now - lastStats.now) / 1000.0;

              if (timeDiff) {
                countRate = (values.countSum - lastStats.countSum) / timeDiff;
                countRateAvg = (values.countAvg - lastStats.countAvg) / timeDiff;
                bytesRate = (values.bytesSum - lastStats.bytesSum) / timeDiff;
                bytesRateAvg = (values.bytesAvg - lastStats.bytesAvg) / timeDiff;
              }
            }
            // add new values for the next round
            lastOutputStats.set(this, values);

            return '<td>' + topic + '</td>\n' +
              '<td>' + countRate.toFixed(0) + '</td>\n' +
              '<td>' + countRateAvg.toFixed(0) + '</td>\n' +
              '<td>' + bytesRate.toFixed(0) + '</td>\n' +
              '<td>' + bytesRateAvg.toFixed(0) + '</td>\n';

          };

          // render the output statistics in the output box
          var outputStatsList = _.toPairs(aggregateStatsByTopic(partitions.Group, "Output", ["Count", "Bytes"]));

          var d = d3.select("#outputStatistics").selectAll(".partitionbox").data(outputStatsList, function(d) {
            return d[0];
          });
          d.enter().append("tr").classed("partitionbox", true).html(renderOutput);
          d.html(renderOutput);
          d.exit().remove()


          var updateJoinStats = function(stats) {

            // stateless processors don't have table stats
            if (stats == null) {
              return;
            }

            stats.Now = new Date();

            var status = statusMap[stats.Status];
            var runMode = runModeMap[stats.RunMode];

            if (stats.Stalled) {
              status = "stalled";
            }
            var recEndTime = stats.Recovery.RecoveryTime != "" ? stats.Recovery.RecoveryTime : stats.Now;
            var recoveryTime = (new Date(recEndTime) - new Date(stats.Recovery.StartTime)) / 1000.0;
            var recoveryRate = 0;
            var recoveryLag = stats.Recovery.Hwm - stats.Recovery.Offset;

            var readRate = 0;
            var readBytes = 0;

            // let's check if we have a previous dataset and set them to their
            // correct values
            var lastStats = lastJoinStats.get(this);

            if (lastStats) {
              // diff to previous stats in seconds
              var timeDiff = (stats.Now - lastStats.Now) / 1000.0;

              if (status == "recovering") {
                recoveryRate = (stats.Recovery.Offset - lastStats.Recovery.Offset) / timeDiff;
              }
              if (timeDiff > 0) {
                readRate = (stats.Input.Count - lastStats.Input.Count) / timeDiff;
                readBytes = (stats.Input.Bytes - lastStats.Input.Bytes) / timeDiff;
              }
            }
            lastJoinStats.set(this, stats);

            return '<td>' + stats.type + '</td>\n' +
              '<td>' + stats.topic + '</td>\n' +
              '<td>' + stats.partition + '</td>\n' +
              '<td>' + status + " (" + runMode + ')</td>\n' +
              '<td>' + recoveryTime + 's</td>\n' +
              '<td>' + recoveryRate.toFixed(0) + '</td>\n' +
              '<td>' + recoveryLag + '</td>\n' +
              '<td>' + readRate.toFixed(0) + '</td>\n' +
              '<td>' + readBytes.toFixed(0) + '</td>\n' +
              '<td>' + stats.Input.OffsetLag.toFixed(0) + '</td>\n' +
              '<td>' + (stats.Input.Delay / 1000000000).toFixed(2) + 's</td>\n';
          };

          // extract all joins and sort them via topic and partition-ID
          var joinedList = _.chain(partitions.Group).flatMap(function(partition, partId) {
            return _.chain(partition.Joined).
            map(function(p, topic) {
              // add the topic and the partition to the object
              p['topic'] = topic;
              p['partition'] = partId;
              p['type'] = 'join';
              return p;
            }).
            values().
            value();
          }).value(); // end the chain

          var lookupList = _.chain(partitions.Lookup).flatMap(function(table, topic) {
            return _.chain(table.Partitions).
            map(function(p, partId) {
              // add the topic and the partition to the object
              p['topic'] = topic;
              p['partition'] = partId;
              p['type'] = 'lookup';
              return p;
            }).
            values().
            value();
          }).value(); // end the chain

          var mergedLists = _.concat(joinedList, lookupList);


          // then sort the list by topic and partition
          mergedLists = mergedLists.sort(function(a, b) {
            var result = a.type.localeCompare(b.type);
            if (result == 0) {
              result = a.topic.localeCompare(b.topic);
            }
            if (result == 0) {
              result = a.partition - b.partition;
            }
            return result;
          });


          // update, enter and remove data
          var d = d3.select("#joinStats").selectAll(".partitionbox").data(mergedLists);
          d.html(updateJoinStats);
          d.enter().append("tr").classed("partitionbox", true).html(updateJoinStats);
          d.exit().remove()


        };

        window.setInterval(function() {
          d3.json("{{.base_path}}/data/processor/{{.vars.idx}}", renderDetails);
        }, 2000);

        // call it initially
        d3.json("{{.base_path}}/data/processor/{{.vars.idx}}", renderDetails);
      </script>
    </div>
  </div>
  {{end}}